package com.flink_demo.demo.cep;

import java.util.List;
import java.util.Map;

import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.IterativeCondition.Context;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.IngestionTimeExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

import com.flink_demo.demo.cep.event.MonitorEvent;
import com.flink_demo.demo.cep.source.MonitorEventSource;

public class CepDemo {
	public static void main(String... args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		SingleOutputStreamOperator<MonitorEvent> inputEventStream = env.addSource(new MonitorEventSource())
				.assignTimestampsAndWatermarks(new IngestionTimeExtractor<MonitorEvent>());
		Pattern<MonitorEvent, MonitorEvent> pattern = Pattern.<MonitorEvent>begin("start")
				.where(new IterativeCondition<MonitorEvent>() {

					private static final long serialVersionUID = 1L;

					@Override
					public boolean filter(MonitorEvent value, Context<MonitorEvent> ctx) throws Exception {
						return value.getId() == 9;
						// return false;
					}
				})
				// .times(3)
				// .optional().greedy()
//				.within(Time.seconds(5))
				.followedBy("fm1").where(new IterativeCondition<MonitorEvent>() {

					private static final long serialVersionUID = 1L;

					@Override
					public boolean filter(MonitorEvent value, Context<MonitorEvent> ctx) throws Exception {
						return value.getValue() == 9;
						// return false;
					}
				}).next("middle").where(new IterativeCondition<MonitorEvent>() {

					private static final long serialVersionUID = 1L;

					@Override
					public boolean filter(MonitorEvent value, Context<MonitorEvent> ctx) throws Exception {
						return value.getValue() == 9;
						// return false;
					}
				}).within(Time.seconds(1))
		// .times(3)
		// .optional()
		// .greedy()
		;

		PatternStream<MonitorEvent> ps = CEP.pattern(inputEventStream.keyBy("id"), pattern);
		// ps.select(new PatternSelectFunction<MonitorEvent, String>() {
		// private static final long serialVersionUID = 1L;
		//
		// @Override
		// public String select(Map<String, List<MonitorEvent>> lists) throws
		// Exception {
		// // TODO Auto-generated method stub
		// return lists.toString();
		// }
		// }).print();
//		ps.select(new PatternSelectFunction<MonitorEvent, String>() {
//			private static final long serialVersionUID = 1L;
//
//			@Override
//			public String select(Map<String, List<MonitorEvent>> lists) throws Exception {
//				// TODO Auto-generated method stub
//				return lists.toString();
//			}
//		}).print();

		ps.select(new PatternTimeoutFunction<MonitorEvent, String>() {
			private static final long serialVersionUID = 1L;

			@Override
			public String timeout(Map<String, List<MonitorEvent>> pattern, long timeoutTimestamp) throws Exception {
				// TODO Auto-generated method stub
				return pattern.toString();
			}

		}, new PatternSelectFunction<MonitorEvent, String>() {
			private static final long serialVersionUID = 1L;

			@Override
			public String select(Map<String, List<MonitorEvent>> lists) throws Exception {
				// TODO Auto-generated method stub
				return lists.toString();
			}
		}).print();
//		 System.out.println(env.getExecutionPlan());
		env.execute();
	}
}
